package com.bw.gmall.realtime.dwd.db.split.function;


import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.bean.TableProcessDwd;
import com.bw.gmall.realtime.common.util.JdbcUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.util.*;

/**
 * 事实表动态分离 关联后数据处理
 */
public class BaseDbTableProcessFunction extends BroadcastProcessFunction<JSONObject, TableProcessDwd, Tuple2<JSONObject,TableProcessDwd>> {

    MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor;

    private Map<String,TableProcessDwd> configMap = new HashMap<>();


    public BaseDbTableProcessFunction(MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //将配置信息预加载到程序中
        Connection mysqlConnection = JdbcUtil.getMysqlConnection();
        List<TableProcessDwd> tableProcessDwdList
                = JdbcUtil.queryList(mysqlConnection, "select * from gmall2023_config.table_process_dwd", TableProcessDwd.class, true);
        for (TableProcessDwd tableProcessDwd : tableProcessDwdList) {
            String sourceTable = tableProcessDwd.getSourceTable();
            String sourceType = tableProcessDwd.getSourceType();
            String key = getKey(sourceTable, sourceType);
            configMap.put(key,tableProcessDwd);
        }
        JdbcUtil.closeConnection(mysqlConnection);
    }

    private  String getKey(String sourceTable, String sourceType) {
        String key = sourceTable + ":" + sourceType;
        return key;
    }

    // 处理主流业务数据
    @Override
    public void processElement(JSONObject value, BroadcastProcessFunction<JSONObject, TableProcessDwd, Tuple2<JSONObject, TableProcessDwd>>.ReadOnlyContext ctx, Collector<Tuple2<JSONObject, TableProcessDwd>> out) throws Exception {
        //获取处理的业务数据库表的表名
        String table = value.getString("table");
        //获取操作类型
        String type = value.getString("type");
        //拼接key
        String key = getKey(table, type);
        //获取广播状态
        ReadOnlyBroadcastState<String, TableProcessDwd> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        //根据key到广播状态 以及 configMap中获取对应的配置信息
        TableProcessDwd tp = null;
        if ( (tp = broadcastState.get(key)) != null || (tp = configMap.get(key)) != null ){
            //说明当前数据,是需要动态分流处理的事实表数据,将date部分传递到下游
            JSONObject dataJsonObj = value.getJSONObject("data");
            //在向下游传递数据千,过滤掉不需要传递的字段
            String sinkColumns = tp.getSinkColumns();
            deleteNotNeedColumns(dataJsonObj, sinkColumns);
            //在向下游传递数据前,将ts事件时间补充到data对象上
            Long ts = value.getLong("ts");
            dataJsonObj.put("ts",ts);
            out.collect(Tuple2.of(dataJsonObj,tp));
        }
    }

    //过滤掉不需要传递的字段
    private static void deleteNotNeedColumns(JSONObject dataJsonObj, String sinkColumns) {
        List<String> columnList = Arrays.asList(sinkColumns.split(","));
        Set<Map.Entry<String, Object>> entries = dataJsonObj.entrySet();
        entries.removeIf(entry->columnList.contains(entry.getKey()));
    }

    //处理广播流配置数据
    @Override
    public void processBroadcastElement(TableProcessDwd tp, BroadcastProcessFunction<JSONObject, TableProcessDwd, Tuple2<JSONObject, TableProcessDwd>>.Context ctx, Collector<Tuple2<JSONObject, TableProcessDwd>> out) throws Exception {
        //获取对配置表进行的操作类型
        String op = tp.getOp();
        // 获取广播状态
        BroadcastState<String, TableProcessDwd> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        //获取业务数据库的表名
        String sourceTable = tp.getSourceTable();
        //获取业务数据库的对应的操作类型
        String sourceType = tp.getSourceType();
        //拼接key
        String key = getKey(sourceTable, sourceType);
        if ("d".equals(op)){
            //从配置表中删除一条数据,那么需要将广播状态以及configMap中对应的配置也删除掉
            broadcastState.remove(key);
            configMap.remove(key);
        } else {
            //从配置表中读取数据或者添加,更新了数据  需要将最新的这条配置信息放到广播状态以及configMap中
            broadcastState.put(key,tp);
            configMap.put(key,tp);
        }
    }
}
